{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "84b22f68",
   "metadata": {},
   "source": [
    "# Batch consuming"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "7a35b05c",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | hide\n",
    "\n",
    "import asyncio\n",
    "import platform\n",
    "\n",
    "import asyncer\n",
    "from IPython.display import Markdown as md\n",
    "\n",
    "from fastkafka._components._subprocess import terminate_asyncio_process\n",
    "from fastkafka._testing.apache_kafka_broker import run_and_match\n",
    "from fastkafka.testing import ApacheKafkaBroker, run_script_and_cancel"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "8c655e4f",
   "metadata": {},
   "source": [
    "If you want to consume data in batches `@consumes` decorator makes that possible for you. By typing a consumed msg object as a `list` of messages the consumer will call your consuming function with a batch of messages consumed from a single partition. Let's demonstrate that now."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "18535f2f",
   "metadata": {},
   "source": [
    "## Consume function with batching\n",
    "\n",
    "To consume messages in batches, you need to wrap you message type into a list and the `@consumes` decorator will take care of the rest for you. Your consumes function will be called with batches grouped by partition now."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "d09190cd",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "```python\n",
       "@app.consumes(auto_offset_reset=\"earliest\")\n",
       "async def on_hello_world(msg: List[HelloWorld]):\n",
       "    logger.info(f\"Got msg batch: {msg}\")\n",
       "\n",
       "```"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# | echo: false\n",
    "\n",
    "consumes_decorator_batch = \"\"\"@app.consumes(auto_offset_reset=\"earliest\")\n",
    "async def on_hello_world(msg: List[HelloWorld]):\n",
    "    logger.info(f\"Got msg batch: {msg}\")\n",
    "\"\"\"\n",
    "md(f\"```python\\n{consumes_decorator_batch}\\n```\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "f66b68d7",
   "metadata": {},
   "source": [
    "## App example\n",
    "\n",
    "We will modify the app example from [@consumes basics](/docs/guides/Guide_11_Consumes_Basics.md) guide to consume `HelloWorld` messages batch. The final app will look like this (make sure you replace the `<url_of_your_kafka_bootstrap_server>` and `<port_of_your_kafka_bootstrap_server>` with the actual values):"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "673c7f8a",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | hide\n",
    "\n",
    "app = \"\"\"\n",
    "import asyncio\n",
    "from typing import List\n",
    "from pydantic import BaseModel, Field\n",
    "\n",
    "from fastkafka import FastKafka\n",
    "from fastkafka._components.logger import get_logger\n",
    "\n",
    "logger = get_logger(__name__)\n",
    "\n",
    "class HelloWorld(BaseModel):\n",
    "    msg: str = Field(\n",
    "        ...,\n",
    "        example=\"Hello\",\n",
    "        description=\"Demo hello world message\",\n",
    "    )\n",
    "\n",
    "kafka_brokers = {\n",
    "    \"demo_broker\": {\n",
    "        \"url\": \"<url_of_your_kafka_bootstrap_server>\",\n",
    "        \"description\": \"local demo kafka broker\",\n",
    "        \"port\": \"<port_of_your_kafka_bootstrap_server>\",\n",
    "    }\n",
    "}\n",
    "\n",
    "app = FastKafka(kafka_brokers=kafka_brokers)\n",
    "\n",
    "\"\"\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "2abb4c3d",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "```python\n",
       "\n",
       "import asyncio\n",
       "from typing import List\n",
       "from pydantic import BaseModel, Field\n",
       "\n",
       "from fastkafka import FastKafka\n",
       "from fastkafka._components.logger import get_logger\n",
       "\n",
       "logger = get_logger(__name__)\n",
       "\n",
       "class HelloWorld(BaseModel):\n",
       "    msg: str = Field(\n",
       "        ...,\n",
       "        example=\"Hello\",\n",
       "        description=\"Demo hello world message\",\n",
       "    )\n",
       "\n",
       "kafka_brokers = {\n",
       "    \"demo_broker\": {\n",
       "        \"url\": \"<url_of_your_kafka_bootstrap_server>\",\n",
       "        \"description\": \"local demo kafka broker\",\n",
       "        \"port\": \"<port_of_your_kafka_bootstrap_server>\",\n",
       "    }\n",
       "}\n",
       "\n",
       "app = FastKafka(kafka_brokers=kafka_brokers)\n",
       "\n",
       "@app.consumes(auto_offset_reset=\"earliest\")\n",
       "async def on_hello_world(msg: List[HelloWorld]):\n",
       "    logger.info(f\"Got msg batch: {msg}\")\n",
       "\n",
       "```"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# | echo: false\n",
    "\n",
    "batch_example = app + consumes_decorator_batch\n",
    "\n",
    "md(f\"```python\\n{batch_example}\\n```\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "d4ec6dab",
   "metadata": {},
   "source": [
    "## Send the messages to kafka topic"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "c81e65bc",
   "metadata": {},
   "source": [
    "Lets send a couple of `HelloWorld` messages to the *hello_world* topic and check if our consumer kafka application has logged the received messages batch. In your terminal, run the following command at least two times to create multiple messages in your kafka queue:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "6ef181f6",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "echo { ^\"msg^\": ^\"Hello world^\" }\n"
     ]
    },
    {
     "data": {
      "text/markdown": [
       "```shell\n",
       "echo { ^\"msg^\": ^\"Hello world^\" } | kafka-console-producer.bat --topic=hello_world --bootstrap-server=<addr_of_your_kafka_bootstrap_server>\n",
       "```"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# | echo: false\n",
    "\n",
    "script_extension = \".bat\" if platform.system() == \"Windows\" else \".sh\"\n",
    "escape_char = \"^\" if platform.system() == \"Windows\" else \"\\\\\"\n",
    "\n",
    "kafka_msg = 'echo {{ {escape_char}\"msg{escape_char}\": {escape_char}\"Hello world{escape_char}\" }}'.format(escape_char=escape_char)\n",
    "\n",
    "producer_cmd = f'{kafka_msg} | kafka-console-producer'+script_extension+' --topic=hello_world --bootstrap-server=<addr_of_your_kafka_bootstrap_server>'\n",
    "md(f\"```shell\\n{producer_cmd}\\n```\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "01604778",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "Now we can run the app. Copy the code of the example app in consumer_example.py and run it by running\n",
       "```shell\n",
       "fastkafka run --num-workers=1 --kafka-broker=demo_broker consumer_example:app\n",
       "```"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# | echo: false\n",
    "\n",
    "script_file = \"consumer_example.py\"\n",
    "filename = script_file.split(\".py\")[0]\n",
    "cmd = f\"fastkafka run --num-workers=1 --kafka-broker=demo_broker {filename}:app\"\n",
    "md(\n",
    "    f\"Now we can run the app. Copy the code of the example app in {script_file} and run it by running\\n```shell\\n{cmd}\\n```\"\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "a66904c8",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "23-06-02 12:08:26.088 [INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): entering...\n",
      "23-06-02 12:08:26.096 [WARNING] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): (<_WindowsSelectorEventLoop running=True closed=False debug=False>) is already running!\n",
      "23-06-02 12:08:26.096 [WARNING] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): calling nest_asyncio.apply()\n",
      "23-06-02 12:08:26.096 [INFO] fastkafka._components.test_dependencies: Java is already installed.\n",
      "23-06-02 12:08:26.104 [INFO] fastkafka._components.test_dependencies: But not exported to PATH, exporting...\n",
      "23-06-02 12:08:26.104 [INFO] fastkafka._components.test_dependencies: Kafka is installed.\n",
      "23-06-02 12:08:26.112 [INFO] fastkafka._components.test_dependencies: But not exported to PATH, exporting...\n",
      "23-06-02 12:08:26.112 [INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...\n",
      "\n",
      "23-06-02 12:08:26.120 [INFO] fastkafka._testing.apache_kafka_broker: zookeeper startup failed, generating a new port and retrying...\n",
      "23-06-02 12:08:26.120 [INFO] fastkafka._testing.apache_kafka_broker: zookeeper new port=50492\n",
      "\n",
      "23-06-02 12:08:26.120 [INFO] fastkafka._testing.apache_kafka_broker: zookeeper startup failed, generating a new port and retrying...\n",
      "23-06-02 12:08:26.128 [INFO] fastkafka._testing.apache_kafka_broker: zookeeper new port=50493\n",
      "\n",
      "23-06-02 12:08:26.128 [INFO] fastkafka._testing.apache_kafka_broker: zookeeper startup failed, generating a new port and retrying...\n",
      "23-06-02 12:08:26.128 [INFO] fastkafka._testing.apache_kafka_broker: zookeeper new port=50494\n",
      "\n",
      "23-06-02 12:08:26.137 [INFO] fastkafka._testing.apache_kafka_broker: zookeeper startup failed, generating a new port and retrying...\n",
      "23-06-02 12:08:26.137 [INFO] fastkafka._testing.apache_kafka_broker: zookeeper new port=50495\n",
      "23-06-02 12:08:26.137 [INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): exited.\n"
     ]
    },
    {
     "ename": "ValueError",
     "evalue": "Could not start zookeeper with params: [{'zookeeper_port': 2181}, {'zookeeper_port': '50492'}, {'zookeeper_port': '50493'}, {'zookeeper_port': '50494'}]",
     "output_type": "error",
     "traceback": [
      "\u001b[1;31m---------------------------------------------------------------------------\u001b[0m",
      "\u001b[1;31mValueError\u001b[0m                                Traceback (most recent call last)",
      "Cell \u001b[1;32mIn[7], line 4\u001b[0m\n\u001b[0;32m      1\u001b[0m \u001b[38;5;66;03m# | hide\u001b[39;00m\n\u001b[1;32m----> 4\u001b[0m \u001b[38;5;28;43;01mwith\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43mApacheKafkaBroker\u001b[49m\u001b[43m(\u001b[49m\n\u001b[0;32m      5\u001b[0m \u001b[43m    \u001b[49m\u001b[43mtopics\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43m[\u001b[49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mhello_world\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m]\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mapply_nest_asyncio\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[38;5;28;43;01mTrue\u001b[39;49;00m\n\u001b[0;32m      6\u001b[0m \u001b[43m)\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;28;43;01mas\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43mbootstrap_server\u001b[49m\u001b[43m:\u001b[49m\n\u001b[0;32m      7\u001b[0m \u001b[43m    \u001b[49m\u001b[38;5;28;43;01masync\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[38;5;28;43;01mwith\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43masyncer\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mcreate_task_group\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;28;43;01mas\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43mtask_group\u001b[49m\u001b[43m:\u001b[49m\n\u001b[0;32m      8\u001b[0m \u001b[43m        \u001b[49m\u001b[43mserver_url\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43m \u001b[49m\u001b[43mbootstrap_server\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43msplit\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43m:\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m)\u001b[49m\u001b[43m[\u001b[49m\u001b[38;5;241;43m0\u001b[39;49m\u001b[43m]\u001b[49m\n",
      "File \u001b[1;32mc:\\users\\kumaran rajendhiran\\dev\\fastkafka\\fastkafka\\_testing\\apache_kafka_broker.py:289\u001b[0m, in \u001b[0;36mApacheKafkaBroker.__enter__\u001b[1;34m(self)\u001b[0m\n\u001b[0;32m    287\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21m__enter__\u001b[39m(\u001b[38;5;28mself\u001b[39m) \u001b[38;5;241m-\u001b[39m\u001b[38;5;241m>\u001b[39m \u001b[38;5;28mstr\u001b[39m:\n\u001b[0;32m    288\u001b[0m     \u001b[38;5;66;03m#         ApacheKafkaBroker._check_deps()\u001b[39;00m\n\u001b[1;32m--> 289\u001b[0m     \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mstart\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n",
      "File \u001b[1;32mc:\\users\\kumaran rajendhiran\\dev\\fastkafka\\fastkafka\\_testing\\apache_kafka_broker.py:621\u001b[0m, in \u001b[0;36mstart\u001b[1;34m(self)\u001b[0m\n\u001b[0;32m    618\u001b[0m         logger\u001b[38;5;241m.\u001b[39merror(msg)\n\u001b[0;32m    619\u001b[0m         \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mRuntimeError\u001b[39;00m(msg)\n\u001b[1;32m--> 621\u001b[0m retval \u001b[38;5;241m=\u001b[39m \u001b[43mloop\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mrun_until_complete\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_start\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\u001b[43m)\u001b[49m\n\u001b[0;32m    622\u001b[0m logger\u001b[38;5;241m.\u001b[39minfo(\u001b[38;5;124mf\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;132;01m{\u001b[39;00m\u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m\u001b[38;5;18m__class__\u001b[39m\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124m.start(): returning \u001b[39m\u001b[38;5;132;01m{\u001b[39;00mretval\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124m\"\u001b[39m)\n\u001b[0;32m    623\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m retval\n",
      "File \u001b[1;32m~\\dev\\fastkafka\\venv\\Lib\\site-packages\\nest_asyncio.py:90\u001b[0m, in \u001b[0;36m_patch_loop.<locals>.run_until_complete\u001b[1;34m(self, future)\u001b[0m\n\u001b[0;32m     87\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m f\u001b[38;5;241m.\u001b[39mdone():\n\u001b[0;32m     88\u001b[0m     \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mRuntimeError\u001b[39;00m(\n\u001b[0;32m     89\u001b[0m         \u001b[38;5;124m'\u001b[39m\u001b[38;5;124mEvent loop stopped before Future completed.\u001b[39m\u001b[38;5;124m'\u001b[39m)\n\u001b[1;32m---> 90\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[43mf\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mresult\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n",
      "File \u001b[1;32m~\\AppData\\Local\\Programs\\Python\\Python311\\Lib\\asyncio\\futures.py:203\u001b[0m, in \u001b[0;36mFuture.result\u001b[1;34m(self)\u001b[0m\n\u001b[0;32m    201\u001b[0m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m__log_traceback \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01mFalse\u001b[39;00m\n\u001b[0;32m    202\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_exception \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m:\n\u001b[1;32m--> 203\u001b[0m     \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_exception\u001b[38;5;241m.\u001b[39mwith_traceback(\u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_exception_tb)\n\u001b[0;32m    204\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_result\n",
      "File \u001b[1;32m~\\AppData\\Local\\Programs\\Python\\Python311\\Lib\\asyncio\\tasks.py:267\u001b[0m, in \u001b[0;36mTask.__step\u001b[1;34m(***failed resolving arguments***)\u001b[0m\n\u001b[0;32m    263\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m    264\u001b[0m     \u001b[38;5;28;01mif\u001b[39;00m exc \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m:\n\u001b[0;32m    265\u001b[0m         \u001b[38;5;66;03m# We use the `send` method directly, because coroutines\u001b[39;00m\n\u001b[0;32m    266\u001b[0m         \u001b[38;5;66;03m# don't have `__iter__` and `__next__` methods.\u001b[39;00m\n\u001b[1;32m--> 267\u001b[0m         result \u001b[38;5;241m=\u001b[39m \u001b[43mcoro\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43msend\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;28;43;01mNone\u001b[39;49;00m\u001b[43m)\u001b[49m\n\u001b[0;32m    268\u001b[0m     \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[0;32m    269\u001b[0m         result \u001b[38;5;241m=\u001b[39m coro\u001b[38;5;241m.\u001b[39mthrow(exc)\n",
      "File \u001b[1;32mc:\\users\\kumaran rajendhiran\\dev\\fastkafka\\fastkafka\\_testing\\apache_kafka_broker.py:561\u001b[0m, in \u001b[0;36m_start\u001b[1;34m(self)\u001b[0m\n\u001b[0;32m    558\u001b[0m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mtemporary_directory \u001b[38;5;241m=\u001b[39m TemporaryDirectory()\n\u001b[0;32m    559\u001b[0m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mtemporary_directory_path \u001b[38;5;241m=\u001b[39m Path(\u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mtemporary_directory\u001b[38;5;241m.\u001b[39m\u001b[38;5;21m__enter__\u001b[39m())\n\u001b[1;32m--> 561\u001b[0m \u001b[38;5;28;01mawait\u001b[39;00m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_start_zookeeper()\n\u001b[0;32m    562\u001b[0m \u001b[38;5;28;01mawait\u001b[39;00m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_start_kafka()\n\u001b[0;32m    564\u001b[0m listener_port \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mkafka_kwargs\u001b[38;5;241m.\u001b[39mget(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mlistener_port\u001b[39m\u001b[38;5;124m\"\u001b[39m, \u001b[38;5;241m9092\u001b[39m)\n",
      "File \u001b[1;32mc:\\users\\kumaran rajendhiran\\dev\\fastkafka\\fastkafka\\_testing\\apache_kafka_broker.py:515\u001b[0m, in \u001b[0;36m_start_zookeeper\u001b[1;34m(self)\u001b[0m\n\u001b[0;32m    512\u001b[0m \u001b[38;5;129m@patch\u001b[39m\n\u001b[0;32m    513\u001b[0m \u001b[38;5;28;01masync\u001b[39;00m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21m_start_zookeeper\u001b[39m(\u001b[38;5;28mself\u001b[39m: ApacheKafkaBroker) \u001b[38;5;241m-\u001b[39m\u001b[38;5;241m>\u001b[39m \u001b[38;5;28;01mNone\u001b[39;00m:\n\u001b[0;32m    514\u001b[0m \u001b[38;5;250m    \u001b[39m\u001b[38;5;124;03m\"\"\"Starts a local ZooKeeper instance asynchronously.\"\"\"\u001b[39;00m\n\u001b[1;32m--> 515\u001b[0m     \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;01mawait\u001b[39;00m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_start_service(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mzookeeper\u001b[39m\u001b[38;5;124m\"\u001b[39m)\n",
      "File \u001b[1;32mc:\\users\\kumaran rajendhiran\\dev\\fastkafka\\fastkafka\\_testing\\apache_kafka_broker.py:503\u001b[0m, in \u001b[0;36m_start_service\u001b[1;34m(self, service)\u001b[0m\n\u001b[0;32m    500\u001b[0m         \u001b[38;5;28msetattr\u001b[39m(\u001b[38;5;28mself\u001b[39m, \u001b[38;5;124mf\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;132;01m{\u001b[39;00mservice\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124m_task\u001b[39m\u001b[38;5;124m\"\u001b[39m, service_task)\n\u001b[0;32m    501\u001b[0m         \u001b[38;5;28;01mreturn\u001b[39;00m\n\u001b[1;32m--> 503\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mValueError\u001b[39;00m(\u001b[38;5;124mf\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mCould not start \u001b[39m\u001b[38;5;132;01m{\u001b[39;00mservice\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124m with params: \u001b[39m\u001b[38;5;132;01m{\u001b[39;00mconfigs_tried\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124m\"\u001b[39m)\n",
      "\u001b[1;31mValueError\u001b[0m: Could not start zookeeper with params: [{'zookeeper_port': 2181}, {'zookeeper_port': '50492'}, {'zookeeper_port': '50493'}, {'zookeeper_port': '50494'}]"
     ]
    }
   ],
   "source": [
    "# | hide\n",
    "\n",
    "\n",
    "with ApacheKafkaBroker(\n",
    "    topics=[\"hello_world\"], apply_nest_asyncio=True, listener_port=12092\n",
    ") as bootstrap_server:\n",
    "    async with asyncer.create_task_group() as task_group:\n",
    "        server_url = bootstrap_server.split(\":\")[0]\n",
    "        server_port = bootstrap_server.split(\":\")[1]\n",
    "\n",
    "        producer_tasks = [task_group.soonify(asyncio.create_subprocess_shell)(\n",
    "            cmd=producer_cmd.replace(\n",
    "                \"<addr_of_your_kafka_bootstrap_server>\", bootstrap_server\n",
    "            ),\n",
    "            stdout=asyncio.subprocess.PIPE,\n",
    "            stderr=asyncio.subprocess.PIPE,\n",
    "        ) for _ in range(2)]\n",
    "        \n",
    "        await asyncio.sleep(5)\n",
    "        \n",
    "        consumer_task = task_group.soonify(run_script_and_cancel)(\n",
    "            script=batch_example.replace(\n",
    "                \"<url_of_your_kafka_bootstrap_server>\", server_url\n",
    "            ).replace(\"<port_of_your_kafka_bootstrap_server>\", server_port),\n",
    "            script_file=script_file,\n",
    "            cmd=cmd,\n",
    "            cancel_after=20,\n",
    "        )\n",
    "\n",
    "assert \"Got msg batch: [HelloWorld(msg='Hello world'), HelloWorld(msg='Hello world')]\" in consumer_task.value[1].decode(\"UTF-8\"), consumer_task.value[1].decode(\"UTF-8\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "e86e202e",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | echo: False\n",
    "\n",
    "print(consumer_task.value[1].decode(\"UTF-8\"))"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "9292901c",
   "metadata": {},
   "source": [
    "You should see the your Kafka messages being logged in batches by your consumer."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "python3",
   "language": "python",
   "name": "python3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
